MapReduce Combiner组件
1.Combiner是MR程序中Mapper和Reduce之外的一种组件 2.Combiner组件的父类就是Reducer 3.Combiner和Reducer之间的区别在于运行的位置 4.Reducer是每一个接收全局的Map Task 所输出的结果 5.Combiner是在MapTask的节点中运行
6.每一个map都会产生大量的本地输出,Combiner的作用就是对map输出的结果先做一次合并,以较少的map和reduce节点中的数据传输量
7.Combiner的存在就是提高当前网络IO传输的性能,也是MapReduce的一种优化手段。
8.实现自定义的Combiner 1.因为源码中的Combiner是继承于Reducer,我们使用自己的Combiner就需要继承Reducer并重写reduce方法 2.job中设置job.setCombinerClass(自定义Combiner的类.class)
ps:需要注意一个点就是:Combiner就是一次Reducer类中reduce方法的实现,所以这里的KV需要和Reducer的KV是一致的 实际开发一定是先实现Mapper之后,知道了KV,然后再根据需要实现自定义的Combiner中的KV
Combiner.java
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class CombinerDemo extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//因为Combiner就相当于在Mapper实现了reduce方法
//所以逻辑和实际Reducer中的reduce方法一致
int sum = 0;
for (Text t : values) {
sum += Integer.parseInt(t.toString());
}
context.write(key, new Text(sum + ""));
}
}
WordCountCombiner.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountCombiner {
//实现MapReduce
/*1.实现Mapper端的逻辑
* KEYIN:文件中读取的偏移量-->LongWritable(固定的)
* VALUEIN:文件中实际读取的内容-->Text
* KEYOUT:Mapper处理完成后传递给Reducer中的KEYIN的数据类型-->不固定,根据需求来
* VALUEOUT:Mapper端处理完成后传递给Reducer中的VALUEIN的数据类型-->不固定,根据需求来
*
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
/* 进入Map处理逻辑之前会执行一次的方法
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
}
/*
* 需要实现Mapper端的处理逻辑
* key:是文件中数据的偏移量,数据类型是由泛型中定义得来的KEYIN
* value:是文件中实际的内容,数据类型是泛型中定义得来的VALUEIN
* context:将处理过后产生的KV,写成文件输出
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//第一种
String [] words = value.toString().split(" ");
for (String word : words) {
context.write(new Text(word), new Text("1"));
}
//第二种
/*
* 通过StringTokenizer类来进行拆分,默认是空格\t,\n,\r,\f
* 需要向 使用迭代器一样使用这个对象
*/
/* String line = value.toString();
//若文件中的数据分割方式较多,那么建议使用这个类进行拆分
//这个类实现了枚举迭代器,所以它提供了一些类似于迭代一样的获取数据方式
StringTokenizer st = new StringTokenizer(line);
while(st.hasMoreTokens()) {//返回是否还有分隔符-->即判断是否还有其他分隔符
//返回从当前位置到分隔符之间的字符串-->获取下一个元素
// st.nextToken();//取出元素
context.write(new Text(st.nextToken()), new Text("1"));
}*/
}
/*
* 在Map处理逻辑之后会执行一次,可以处理一些逻辑
*/
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
}
}
//实现Reducer端的逻辑
/*
* Reducer相当于对Mapper端处理过后的数据进行一个实际的处理业务
* KEYIN-->Mapper处理过后输出key的数据类型,由Mapper的泛型中第三个参数决定
* VALUE-->Mapper处理过后输出value的数据类型,由Mapper的泛型中第四个参数决定
* KEYOUT-->Reducer端处理完数据之后要写出key的数据类
* VALUEOUT-->Reducer处理完数据之后,要写出value的 数据类
*/
public static class MyReduce extends Reducer<Text, Text, Text, Text>{
/* 执行Reducer端先执行一次的方法
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
}
/*
*reduce方法是处理业务的核心逻辑
*key: 是从Mapper端处理完成后,产生key的数据
*values-->是从 Mapper端处理完成之后相同key的values的数据集合
*context-->写出实际 处理完成后KV的方法
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//因为Combiner就相当于在Mapper实现了reduce方法
//所以逻辑和实际Reducer中的reduce方法一致
int sum = 0;
for (Text t : values) {
sum += Integer.parseInt(t.toString());
}
context.write(key, new Text(sum + ""));
}
/*
* 执行完reduce方法执行的方法
*
*/
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
}
}
/**
* 实现job,完成作业配置
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取配置对象
Configuration conf = new Configuration();
//2.创建Job对象(创建作业)
/*
* 这个方法一共有两个参数版本
* getInstance(conf) --------直接传入配置对象
* getInstance(conf,"WordCountCombiner")---传入配置对象和类的名字
*/
Job job = Job.getInstance(conf);
//3.设置运行job的class
job.setJarByClass(WordCountCombiner.class);
//4.设置Mapper端的运行类和输出key,输出value的数据类型
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//5.读取数据来源
//这两个方法处理是一样的,只是最后一个参数不同,
/*
* FileInputFormat.addInputPath(job, new Path("input/data1"));
*
* add:证明只有 一个路径,
*
* FileInputFormat.setInputPaths(job, new Path("input/data1"));
* set证明后面是可变参数,多个
*
* 因为当前运行的是本地MR,所以数据是 从本地读取,若需要在集群中运行,这个位置的参数应该是args[0]
*/
//FileInputFormat.addInputPath(job, new Path("input/data1"));
FileInputFormat.setInputPaths(job, new Path("input/data1"));
//优化设置
//一般可以写分区设置,多文件输出设置,Combiner设置
/*
* 并不是所有job都适用于Combiner,只有操作满足结合规律才可以进行设置
* 如 求和,求最大值,topN 等可以使用Combiner
*
* Combiner不一定需要存在,只有数据量较大,需要做优化的时候可以使用
*/
job.setCombinerClass(CombinerDemo.class);
//6.社会Reducer端的运行类和输出key和输出value的数据类型
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//7.处理完文件之后输出的路径
//ps:因为当前运行的是本地MR,所以数据是写到本地的,若需要再集群中运行,这个位置的参数应该是args[1]
//数据是存储到HDFS中
FileOutputFormat.setOutputPath(job, new Path("output1"));
//8.提交作业
int isok = job.waitForCompletion(true)?0:-1;
System.exit(isok);
}
}
word.txt里面的内容